/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */
#define USING_LOG_PREFIX COMMON
#include "deps/oblib/src/common/object/ob_obj_type.h"
#include "test_op_engine.h"
#include "ob_fake_table_scan_vec_op.h"
#include "data_generator.h"

namespace oceanbase
{
namespace sql
{
int ObFakeTableScanVecOp::inner_open()
{
  ObDataGenerator::get_instance().register_op(this);
  std::string round;
  ObTestOpConfig::get_instance().get_config("round", round);
  if (!round.empty()) { max_round_ = current_round_ + std::stoi(round); }

  return OB_SUCCESS;
}

int ObFakeTableScanVecOp::inner_get_next_batch(const int64_t max_row_cnt)
{
  int ret = OB_SUCCESS;
  clear_evaluated_flag();
  uint64_t op_id = get_spec().get_id();
  int64_t generate_random_value = 0;
  bool is_duplicate = false;

  const ObPushdownExprSpec &pd_expr_spec =
    reinterpret_cast<const ObTableScanSpec *>(&spec_)->tsc_ctdef_.scan_ctdef_.pd_expr_spec_;
  for (int j = 0; j < pd_expr_spec.access_exprs_.count(); j++) {
    ObExpr *expr = pd_expr_spec.access_exprs_.at(j);
    OB_FAIL(fill_random_data_into_expr_datum_frame(j, pd_expr_spec.access_exprs_.count(), expr,
                                                   ObTestOpConfig::get_instance().batch_size_, is_duplicate));
  }

  // random set skip
  set_random_skip(current_round_, ObTestOpConfig::get_instance().batch_size_);

  if (is_duplicate) { ObDataGenerator::get_instance().reset_temp_store(op_id, current_round_); }

  current_round_++;
  brs_.size_ = ObTestOpConfig::get_instance().batch_size_;
  if (current_round_ == max_round_) { brs_.end_ = true; }

  // print generate data
  LOG_INFO("[DG] data generated by DataGenerator in ", K(current_round_ - 1));
  if (op_id_2_output_streams_.count(op_id) == 0) {
    std::string output_file_name = "generate_data_" + std::to_string(op_id) + ".data";
    op_id_2_output_streams_[op_id].open(output_file_name.data(), std::ios::out | std::ios::trunc);
  }
  test::TestOpEngine::print_to_file(&brs_, this, pd_expr_spec.access_exprs_, false, &op_id_2_output_streams_[op_id]);

  return ret;
}

int ObFakeTableScanVecOp::fill_random_data_into_expr_datum_frame(int expr_i, int expr_count, const ObExpr *expr,
                                                                 const int output_row_count, bool &is_duplicate)
{
  int ret = OB_SUCCESS;
  LOG_DEBUG("generate_random_value for expr:  ", K(*expr));

  ObIVector *i_vec = NULL;
  ObDatum *datums = NULL;

  if (expr->enable_rich_format()) {
    // vectorization 2.0 new operator
    i_vec = expr->get_vector(eval_ctx_);
    if (expr->is_fixed_length_data_) {
      // for VEC_FIXED and VEC_UNIFORM
      // we must first call init_vector because we need to use i_vec->set_xxx() which need use vector's meta data
      OB_ASSERT(ObTestOpConfig::get_instance().digit_data_format_ == VEC_UNIFORM
                || ObTestOpConfig::get_instance().digit_data_format_ == VEC_FIXED);
      expr->init_vector(eval_ctx_, ObTestOpConfig::get_instance().digit_data_format_, output_row_count, true);
    } else {
      // while for VEC_DISCRETE and VEC_CONTINUOUS it doesn't matter
      OB_ASSERT(ObTestOpConfig::get_instance().string_data_format_ == VEC_UNIFORM
                || ObTestOpConfig::get_instance().string_data_format_ == VEC_DISCRETE
                || ObTestOpConfig::get_instance().string_data_format_ == VEC_CONTINUOUS);
      expr->init_vector(eval_ctx_, ObTestOpConfig::get_instance().string_data_format_, output_row_count, true);
    }
  } else {
    // vectorization 1.0 old operator
    datums = expr->locate_datums_for_update(eval_ctx_, output_row_count);
  }

  // generate random data
  get_random_data(expr_i, expr_count, expr, current_round_, output_row_count, expr->max_length_, is_duplicate);

  int vec_continuous_offset = 0;   // only used in VEC_CONTINUOUS
  std::string vec_continuous_data; // use to store data of VEC_CONTINUOUS temporarily

  ObDataGenerator::TempDataStore &data_store =
    ObDataGenerator::get_instance().op_2_round_2_temp_store_[get_spec().get_id()][current_round_][expr_i];
  for (int row = 0; row < output_row_count; row++) {
    bool is_null = data_store.null_[row];

    switch (expr->datum_meta_.get_type()) {
    // why expr->datum_meta_.get_type() == ObInt32Type while expr->res_buf_len_ == 8 ?????
    case ObInt32Type: {
      int data = data_store.temp_int32_vector_[row];
      if (i_vec != NULL) {
        i_vec->set_int(row, static_cast<int64_t>(data));
        if (is_null) { i_vec->set_null(row); }
      } else {
        datums[row].set_int32(data);
        if (is_null) { datums[row].set_null(); }
      }
      break;
    }
    case ObIntType: {
      int64_t data = data_store.temp_int64_vector_[row];
      if (i_vec != NULL) {
        i_vec->set_int(row, data);
        if (is_null) { i_vec->set_null(row); }
      } else {
        datums[row].set_int(data);
        if (is_null) { datums[row].set_null(); }
      }
      break;
    }
    case ObDoubleType: {
      double data = data_store.temp_double_vector_[row];
      if (i_vec != NULL) {
        i_vec->set_double(row, data);
        if (is_null) { i_vec->set_null(row); }
      } else {
        datums[row].set_double(data);
        if (is_null) { datums[row].set_null(); }
      }
      break;
    }
    case ObNumberType: {
      break;
    }
    case ObDecimalIntType: {
      // have bug
      // if decimal(20, 10)
      // expr->res_buf_len_ = 16 is *not* equal to datums[x].len_ == 8
      // where datums[x].len_ is set?
      // i_vec->set_int(row,
      //                ObDataGenerator::get_instance().round_2_temp_store_[round_].first[expr_i].temp_int64_vector_[row]);
      break;
    }
    case ObVarcharType:
    case ObCharType: {
      std::string tmp_str = data_store.temp_string_vector_[row];
      int str_len = static_cast<int>(tmp_str.size());

      if (i_vec != NULL) {
        // vectorization 1.0 new operator
        if (expr->get_format(eval_ctx_) == VEC_UNIFORM) {
          if (is_null) {
            i_vec->set_null(row);
          } else {
            i_vec->set_payload(row, tmp_str.data(), str_len);
          }
        } else {
          if (is_null) { i_vec->set_null(row); }
          if (expr->get_format(eval_ctx_) == VEC_DISCRETE) {
            char **ptrs = expr->get_discrete_vector_ptrs(eval_ctx_);
            int32_t *lens = expr->get_discrete_vector_lens(eval_ctx_);
            ptrs[row] = expr->get_res_buf(eval_ctx_) + row * (expr->res_buf_len_);
            lens[row] = str_len;

            i_vec->set_payload(row, tmp_str.data(), str_len);
          } else {
            // VEC_CONTINUOUS
            // offset[0] == 0 which is set in cg phase
            vec_continuous_offset += str_len;
            uint32_t *offset = expr->get_continuous_vector_offsets(eval_ctx_);
            offset[row + 1] = vec_continuous_offset;
            vec_continuous_data += tmp_str; // temporarily store data here
          }
        }
      } else {
        // vectorization 1.0 old operator
        if (is_null) {
          datums[row].set_null();
        } else {
          datums[row].set_string(tmp_str.data(), str_len);
        }
      }

      break;
    }
    default: LOG_INFO("Can not generate random value so far for: ", K(expr->datum_meta_.get_type()));
    }
  }

  if (expr->enable_rich_format() && expr->get_format(eval_ctx_) == VEC_CONTINUOUS && !vec_continuous_data.empty()) {
    ObDynReserveBuf *drb = reinterpret_cast<ObDynReserveBuf *>(expr->get_continuous_vector_data(eval_ctx_));
    // Todo: replace below logic when shengle handle continuous format memory
    // in ObExpr::get_str_res_mem()
    char *mem = NULL;
    const int64_t alloc_size = next_pow2(vec_continuous_data.size());
    if (OB_UNLIKELY(alloc_size > UINT32_MAX)) {
      ret = OB_INVALID_ARGUMENT;
      LOG_WARN("invalid argument", K(vec_continuous_data.size()), K(alloc_size), K(ret));
    } else if (OB_ISNULL(mem = static_cast<char *>(eval_ctx_.alloc_expr_res(alloc_size)))) {
      ret = OB_ALLOCATE_MEMORY_FAILED;
      LOG_WARN("allocate memory failed", K(ret), K(ret));
    } else {
      // When extend memory, the old memory can not free, because the old memory may
      // still be referenced. see: ob_datum_cast.cpp::common_copy_string
      if (0 == drb->len_) { drb->magic_ = ObDynReserveBuf::MAGIC_NUM; }
      drb->len_ = alloc_size;
      drb->mem_ = mem;
      MEMCPY(drb->mem_, vec_continuous_data.data(), vec_continuous_data.size());

      ObContinuousBase *cont_vec = static_cast<ObContinuousBase *>(expr->get_vector(eval_ctx_));
      cont_vec->set_data(drb->mem_);
    }
    LOG_DEBUG("extend expr result memory", K(ret), K(vec_continuous_data.size()), K(alloc_size), KP(this), KP(mem));
  }

  return ret;
}

int ObFakeTableScanVecOp::get_random_data(int expr_i, int expr_count, const ObExpr *expr, const int round,
                                          const int batch_size, const int len, bool &is_duplicate)
{
  int ret = OB_SUCCESS;
  uint64_t op_id = get_spec().get_id();
  switch (expr->datum_meta_.get_type()) {
  // why expr->datum_meta_.get_type() == ObInt32Type while expr->res_buf_len_ == 8 ?????
  case ObInt32Type: {
    ObDataGenerator::get_instance().generate_data<int>(op_id, expr_i, expr_count, round, batch_size, len, is_duplicate);
    break;
  }
  case ObIntType: {
    ObDataGenerator::get_instance().generate_data<int64_t>(op_id, expr_i, expr_count, round, batch_size, len,
                                                           is_duplicate);
    break;
  }
  case ObDoubleType: {
    ObDataGenerator::get_instance().generate_data<double>(op_id, expr_i, expr_count, round, batch_size, len,
                                                          is_duplicate);
    break;
  }
  case ObNumberType: {
    break;
  }
  case ObDecimalIntType: {
    // have bug
    // if decimal(20, 10)
    // expr->res_buf_len_ = 16 is *not* equal to datums[x].len_ == 8
    // where datums[x].len_ is set?
    ObDataGenerator::get_instance().generate_data<int64_t>(op_id, expr_i, expr_count, round, batch_size, len,
                                                           is_duplicate);
    break;
  }
  case ObVarcharType:
  case ObCharType: {
    ObDataGenerator::get_instance().generate_data<std::string>(op_id, expr_i, expr_count, round, batch_size, len,
                                                               is_duplicate);
    break;
  }
  default: LOG_INFO("Can not generate random value so far for: ", K(expr->datum_meta_.get_type()));
  }

  return ret;
}

void ObFakeTableScanVecOp::set_random_skip(const int round, const int batch_size)
{
  std::string generate_data_skips;
  uint64_t op_id = get_spec().get_id();
  if (ObDataGenerator::get_instance().op_2_round_2_skips_[op_id].count(round) == 0) {
    std::uniform_int_distribution<int> u_i(0, 1);
    bool if_skip;
    for (int i = 0; i < batch_size; i++) {
      if_skip =
        ObDataGenerator::get_instance().zero_one_rand_by_probability(ObTestOpConfig::get_instance().skips_probability_);
      generate_data_skips += std::to_string(!if_skip) + " ";
      ObDataGenerator::get_instance().op_2_round_2_skips_[op_id][round].push_back(if_skip);
    }
  }

  for (int i = 0; i < batch_size; i++) {
    if (ObDataGenerator::get_instance().op_2_round_2_skips_[op_id][round][i] == true) { brs_.skip_->set(i); }
  }
  LOG_INFO("skips : ", K(generate_data_skips.data()));
  return;
}

} // namespace sql
} // namespace oceanbase